Shared Memory and IPC
The Benchmark That Changes How You Think About IPC
Consider passing a 500 MB NumPy array from a data loader process to a model inference process. There are two approaches:
# Approach 1: Pickle serialisation through a multiprocessing.Queue
import multiprocessing
import numpy as np
import time
import pickle
def producer_pickle(q):
arr = np.random.rand(500 * 1024 * 1024 // 8).astype(np.float64) # 500 MB
t0 = time.perf_counter()
q.put(arr) # pickle: serialize + copy into pipe
print(f"Pickle put: {time.perf_counter() - t0:.3f}s")
def consumer_pickle(q):
t0 = time.perf_counter()
arr = q.get() # deserialize from pipe
print(f"Pickle get: {time.perf_counter() - t0:.3f}s")
print(f"Array shape: {arr.shape}")
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer_pickle, args=(q,))
p2 = multiprocessing.Process(target=consumer_pickle, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()
# Typical result: ~8 seconds (serialize to bytes, copy through pipe kernel buffer)
# Approach 2: POSIX shared memory - zero copies
from multiprocessing.shared_memory import SharedMemory
import multiprocessing
import numpy as np
import time
SHAPE = (500 * 1024 * 1024 // 8,) # 500 MB as float64
DTYPE = np.float64
def producer_shm(shm_name: str, ready_event, done_event):
# Attach to existing shared memory
shm = SharedMemory(name=shm_name)
arr = np.frombuffer(shm.buf, dtype=DTYPE).reshape(SHAPE)
t0 = time.perf_counter()
# Write data - modifies shared pages directly (no copy)
arr[:] = np.random.rand(*SHAPE)
print(f"SHM write: {time.perf_counter() - t0:.3f}s")
ready_event.set() # signal: data is ready
done_event.wait() # wait for consumer to finish
shm.close()
def consumer_shm(shm_name: str, ready_event, done_event):
ready_event.wait() # wait for producer
# Attach to shared memory
shm = SharedMemory(name=shm_name)
arr = np.frombuffer(shm.buf, dtype=DTYPE).reshape(SHAPE)
t0 = time.perf_counter()
# Read data - accesses shared pages directly (no copy)
result = arr.sum()
print(f"SHM read: {time.perf_counter() - t0:.3f}s")
print(f"Sum: {result:.2f}")
done_event.set()
shm.close()
# Allocate shared memory
nbytes = np.prod(SHAPE) * np.dtype(DTYPE).itemsize
shm = SharedMemory(create=True, size=nbytes)
ready = multiprocessing.Event()
done = multiprocessing.Event()
p1 = multiprocessing.Process(target=producer_shm, args=(shm.name, ready, done))
p2 = multiprocessing.Process(target=consumer_shm, args=(shm.name, ready, done))
p1.start(); p2.start()
p1.join(); p2.join()
shm.close()
shm.unlink() # delete the /dev/shm entry
# Typical result: ~0.003 seconds (memset + page faults, no serialization)
The results:
- Pickle through Queue: ~8 seconds (serialize → write to pipe → read from pipe → deserialize)
- Shared memory: ~3 milliseconds (direct memory writes, zero copies across process boundary)
The difference is ~2700x. For real-time ML inference pipelines, model serving, and video processing, this difference is the boundary between feasible and impossible.
IPC Mechanisms: The Decision Matrix
Mechanism | Latency | Throughput | Persistence | Processes | Notes
───────────────────┼───────────┼────────────┼─────────────┼───────────┼──────────────────
Anonymous pipe | ~1 μs | ~2 GB/s | No | Parent+child | One-way, kernel buf
Named pipe (FIFO) | ~2 μs | ~1.5 GB/s | Filesystem | Unrelated | One-way, blocks
Unix domain socket | ~1 μs | ~2.1 GB/s | No | Any local | Bidirectional
TCP loopback | ~4 μs | ~0.9 GB/s | No | Any | Cross-host capable
Shared memory | ~0.1 μs | ~20 GB/s | /dev/shm | Any local | Requires sync
Message queue | ~5 μs | ~200 MB/s | Optional | Any local | Built-in framing
mmap(MAP_SHARED) | ~0.1 μs | ~20 GB/s | File | Any local | Requires sync
Choose based on:
- Data size: < 64 KB → pipes/sockets; > 1 MB → shared memory
- Process relationship: parent/child only → anonymous pipe; unrelated → named pipe or socket
- Directionality: one-way → pipe; bidirectional → socket or shared memory + semaphores
- Message framing: pipes are streams (no boundaries) → add length prefix; message queues have built-in framing
Pipes: os.pipe()
An anonymous pipe is the simplest IPC mechanism: a unidirectional byte stream with a fixed kernel buffer (~64 KB on Linux). Data written to the write end is read from the read end in FIFO order.
import os
import select
import time
# os.pipe() returns (read_fd, write_fd)
r_fd, w_fd = os.pipe()
# Fork a child; child writes, parent reads
pid = os.fork()
if pid == 0:
# Child: close the read end, write data
os.close(r_fd)
for i in range(10):
message = f"Message {i:03d} from child\n".encode()
os.write(w_fd, message)
time.sleep(0.01)
os.close(w_fd) # EOF signal - parent's read() will return b""
os._exit(0)
else:
# Parent: close the write end, read data
os.close(w_fd)
while True:
data = os.read(r_fd, 4096)
if not data:
break # EOF: child closed write end
print(f"Parent received: {data.decode()}", end="")
os.close(r_fd)
os.waitpid(pid, 0)
print("Done")
Non-Blocking Pipe Reads with select
If the read end blocks and no data arrives, the process hangs. Use select for a timeout:
import os
import select
import fcntl
r_fd, w_fd = os.pipe()
# Make read end non-blocking
flags = fcntl.fcntl(r_fd, fcntl.F_GETFL)
fcntl.fcntl(r_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Write some data
os.write(w_fd, b"hello from pipe")
# Read with select timeout
readable, _, _ = select.select([r_fd], [], [], 1.0) # 1 second timeout
if readable:
data = os.read(r_fd, 4096)
print(f"Read {len(data)} bytes: {data}")
else:
print("Timeout: no data in pipe")
os.close(r_fd)
os.close(w_fd)
Named Pipes (FIFOs)
A FIFO is a pipe that has a name in the filesystem. Unrelated processes (not in a parent/child relationship) can use FIFOs to communicate. Like anonymous pipes, FIFOs are unidirectional byte streams.
import os
import threading
import time
FIFO_PATH = "/tmp/myapp_fifo"
def create_fifo():
if os.path.exists(FIFO_PATH):
os.unlink(FIFO_PATH)
os.mkfifo(FIFO_PATH, mode=0o600)
def fifo_producer():
"""Write messages to the FIFO."""
# open() blocks until a reader opens the other end
print("Producer: waiting for reader...")
with open(FIFO_PATH, "wb", buffering=0) as f:
print("Producer: reader connected, writing messages")
for i in range(5):
msg = f"FIFO message {i}\n".encode()
f.write(msg)
time.sleep(0.1)
print("Producer: done")
def fifo_consumer():
"""Read messages from the FIFO."""
# Give producer time to open
time.sleep(0.05)
print("Consumer: opening FIFO...")
with open(FIFO_PATH, "rb", buffering=0) as f:
while True:
data = f.read(256)
if not data:
break # writer closed, EOF
print(f"Consumer received: {data.decode()}", end="")
print("Consumer: done")
create_fifo()
t_prod = threading.Thread(target=fifo_producer)
t_cons = threading.Thread(target=fifo_consumer)
t_prod.start()
t_cons.start()
t_prod.join()
t_cons.join()
os.unlink(FIFO_PATH)
multiprocessing.Pipe: High-Level Duplex Pipe
multiprocessing.Pipe() returns a pair of Connection objects connected by either a socket pair (duplex) or a pair of pipes (simplex). It handles message framing, pickle serialization, and buffering internally.
import multiprocessing
def worker(conn):
"""Worker process: receives a task, processes it, sends result."""
while True:
try:
task = conn.recv() # blocks until data available
except EOFError:
break # parent closed connection
if task is None:
break
result = task ** 2
conn.send(result)
conn.close()
print(f"Worker done (PID {multiprocessing.current_process().pid})")
# duplex=True (default): both ends can send and receive
parent_conn, child_conn = multiprocessing.Pipe(duplex=True)
proc = multiprocessing.Process(target=worker, args=(child_conn,))
proc.start()
child_conn.close() # parent doesn't use child_conn
# Send tasks and receive results
for i in range(5):
parent_conn.send(i)
result = parent_conn.recv()
print(f"Task {i} -> Result {result}")
parent_conn.send(None) # signal worker to exit
parent_conn.close()
proc.join()
# Connection methods:
# conn.send(obj) → pickles obj and sends it as one message
# conn.recv() → receives one message and unpickles it
# conn.poll(timeout) → check if data is available without blocking
# conn.send_bytes(b) → send raw bytes (no pickling)
# conn.recv_bytes() → receive raw bytes
# conn.fileno() → get underlying FD for use with select
multiprocessing.Queue: The Producer-Consumer Workhorse
multiprocessing.Queue is built on top of pipes with an internal thread for buffering. It provides put()/get() semantics and a bounded queue size.
import multiprocessing
import time
import os
def producer(q: multiprocessing.Queue, n_items: int):
"""Produce n_items tasks."""
for i in range(n_items):
q.put({"task_id": i, "data": list(range(100))})
if i % 100 == 0:
print(f"Producer: put {i}/{n_items}")
q.put(None) # sentinel: signals consumer to stop
print("Producer: done")
def consumer(q: multiprocessing.Queue):
"""Consume tasks from the queue."""
count = 0
while True:
item = q.get()
if item is None:
break
count += 1
# Simulate work
_ = sum(item["data"])
print(f"Consumer: processed {count} items")
# maxsize=0 means unlimited; set maxsize to apply backpressure on producer
q = multiprocessing.Queue(maxsize=1000)
prod = multiprocessing.Process(target=producer, args=(q, 2000))
cons = multiprocessing.Process(target=consumer, args=(q,))
t0 = time.perf_counter()
prod.start()
cons.start()
prod.join()
cons.join()
print(f"Total time: {time.perf_counter() - t0:.2f}s")
The Classic Deadlock: join() Before Draining the Queue
# DANGEROUS: This pattern causes deadlock if the queue fills up
import multiprocessing
def bad_producer(q):
for i in range(10000):
q.put(b"x" * 10000) # large items fill the pipe buffer
q.put(None)
# If the queue fills (its internal pipe buffer overflows),
# put() blocks. Meanwhile, join() in the main process also blocks
# because the producer is blocked. Deadlock.
q = multiprocessing.Queue()
p = multiprocessing.Process(target=bad_producer, args=(q,))
p.start()
# p.join() # DEADLOCK if producer fills the queue before we start consuming!
# Fix: drain the queue BEFORE joining the producer process
items = []
while True:
item = q.get()
if item is None:
break
items.append(item)
p.join() # safe now: producer finished, queue is drained
print(f"Got {len(items)} items")
multiprocessing.shared_memory.SharedMemory (Python 3.8+)
SharedMemory creates a POSIX shared memory region - a named object in /dev/shm that multiple processes can map into their address space simultaneously. No serialization, no copies across process boundaries.
from multiprocessing.shared_memory import SharedMemory
import multiprocessing
import numpy as np
import ctypes
# ─── Creating and Writing Shared Memory ──────────────────────────────────────
def create_shared_array(shape: tuple, dtype: np.dtype) -> tuple[SharedMemory, np.ndarray]:
"""
Allocate a NumPy array in shared memory.
Returns (SharedMemory, ndarray) - keep both alive.
"""
nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
shm = SharedMemory(create=True, size=nbytes)
# Attach NumPy array to the shared memory buffer
# np.frombuffer creates an ndarray backed by the provided buffer
arr = np.frombuffer(shm.buf, dtype=dtype).reshape(shape)
return shm, arr
def attach_shared_array(name: str, shape: tuple, dtype: np.dtype) -> tuple[SharedMemory, np.ndarray]:
"""
Attach to an existing shared memory region by name.
"""
shm = SharedMemory(name=name, create=False)
arr = np.frombuffer(shm.buf, dtype=dtype).reshape(shape)
return shm, arr
# ─── Worker Pool Sharing a Read-Only Model ───────────────────────────────────
MODEL_SHAPE = (1000, 1000)
MODEL_DTYPE = np.float32
def inference_worker(shm_name: str, task_q: multiprocessing.Queue,
result_q: multiprocessing.Queue):
"""
Worker process: attaches to shared model weights, runs inference.
The model is read-only - no copies, no serialization.
"""
# Attach to shared memory - O(1) operation (just mmap call)
shm, model_weights = attach_shared_array(shm_name, MODEL_SHAPE, MODEL_DTYPE)
while True:
task = task_q.get()
if task is None:
break
# Use model_weights directly - accessing shared physical pages
input_vec = np.array(task, dtype=np.float32)
result = model_weights @ input_vec[:MODEL_SHAPE[1]] # matrix multiply
result_q.put(float(result.sum()))
shm.close() # detach (don't unlink - main process owns it)
def run_shared_model_pool():
# Allocate model weights in shared memory (main process owns them)
shm, model = create_shared_array(MODEL_SHAPE, MODEL_DTYPE)
# Initialize weights (e.g., load from disk)
model[:] = np.random.rand(*MODEL_SHAPE).astype(MODEL_DTYPE)
print(f"Model loaded into shared memory: {shm.name} ({shm.size // (1024*1024)} MB)")
n_workers = 4
task_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()
# Start workers - each attaches to the shared model, no copying
workers = [
multiprocessing.Process(
target=inference_worker,
args=(shm.name, task_q, result_q),
daemon=True,
)
for _ in range(n_workers)
]
for w in workers:
w.start()
# Submit tasks
n_tasks = 20
for i in range(n_tasks):
task_q.put(np.random.rand(MODEL_SHAPE[1]).tolist())
# Signal workers to stop
for _ in range(n_workers):
task_q.put(None)
# Collect results
results = [result_q.get() for _ in range(n_tasks)]
print(f"Completed {n_tasks} inference tasks")
print(f"Results sample: {results[:5]}")
for w in workers:
w.join()
# Cleanup: must unlink to remove /dev/shm entry
shm.close()
shm.unlink()
print("Shared memory cleaned up")
run_shared_model_pool()
Shared Memory Lifecycle and unlink()
from multiprocessing.shared_memory import SharedMemory
# Create: allocates /dev/shm/<name>
shm = SharedMemory(create=True, size=4096, name="myapp_data")
shm.buf[:5] = bytes([1, 2, 3, 4, 5])
# Attach from another process (or thread)
shm2 = SharedMemory(name="myapp_data", create=False)
print(list(shm2.buf[:5])) # [1, 2, 3, 4, 5]
# close(): detaches this process's mapping (decrements reference count)
# The /dev/shm entry still exists after close()
shm.close()
shm2.close()
# unlink(): deletes the /dev/shm entry (like os.unlink for regular files)
# Must be called exactly once by the owner process
# If you don't call unlink(), the region persists until reboot
shm.unlink()
# List shared memory regions:
import os
for name in os.listdir("/dev/shm"):
path = f"/dev/shm/{name}"
size = os.path.getsize(path)
print(f" /dev/shm/{name}: {size} bytes")
POSIX Semaphores: Coordinating Access
Semaphores control access to shared resources across processes. multiprocessing.Semaphore wraps POSIX semaphores.
import multiprocessing
import time
import random
def rate_limited_worker(sem: multiprocessing.Semaphore,
worker_id: int,
results: multiprocessing.Queue):
"""
Worker that acquires a semaphore slot before doing work.
Limits concurrent access to a shared resource (e.g., a database).
"""
for task_num in range(5):
# Acquire: blocks if semaphore count == 0
sem.acquire()
try:
# At most `max_concurrent` workers are here simultaneously
print(f"Worker {worker_id} doing task {task_num} (PID {multiprocessing.current_process().pid})")
time.sleep(random.uniform(0.1, 0.3)) # simulate DB query
results.put((worker_id, task_num))
finally:
# Release: increments semaphore, allows another worker in
sem.release()
def run_rate_limited_pool():
max_concurrent = 2 # allow at most 2 concurrent workers
sem = multiprocessing.Semaphore(max_concurrent)
results = multiprocessing.Queue()
workers = [
multiprocessing.Process(
target=rate_limited_worker,
args=(sem, i, results),
)
for i in range(6)
]
t0 = time.perf_counter()
for w in workers:
w.start()
for w in workers:
w.join()
elapsed = time.perf_counter() - t0
all_results = []
while not results.empty():
all_results.append(results.get())
print(f"\nAll {len(all_results)} tasks completed in {elapsed:.2f}s")
print(f"(With {max_concurrent} concurrent slots, ~{elapsed:.1f}s expected for 5×6 tasks at 0.2s each)")
run_rate_limited_pool()
BoundedSemaphore for Producer Rate Limiting
import multiprocessing
import time
def data_producer(sem: multiprocessing.BoundedSemaphore,
data_q: multiprocessing.Queue):
"""
Producer: uses semaphore as a token bucket.
Each item produced consumes one token; consumer releases tokens.
This limits how far ahead the producer can get.
"""
for i in range(20):
sem.acquire() # blocks if consumer can't keep up
data_q.put(f"item_{i}")
print(f"Produced item_{i}")
data_q.put(None)
def data_consumer(sem: multiprocessing.BoundedSemaphore,
data_q: multiprocessing.Queue):
"""
Consumer: processes items and releases semaphore tokens,
allowing producer to continue.
"""
while True:
item = data_q.get()
if item is None:
break
time.sleep(0.1) # simulate slow consumer
print(f" Consumed {item}")
sem.release() # producer can produce another item
# Buffer of at most 5 items ahead
buffer_size = 5
sem = multiprocessing.BoundedSemaphore(buffer_size)
q = multiprocessing.Queue()
prod = multiprocessing.Process(target=data_producer, args=(sem, q))
cons = multiprocessing.Process(target=data_consumer, args=(sem, q))
prod.start()
cons.start()
prod.join()
cons.join()
mmap for Anonymous Shared Memory (Alternative Approach)
mmap.mmap(-1, size) with MAP_SHARED (via the flags parameter on Linux) creates anonymous shared memory that persists across fork(). This is a lower-level alternative to SharedMemory.
import mmap
import os
import ctypes
import struct
# Anonymous shared memory accessible to parent and children after fork()
shm = mmap.mmap(-1, 4096) # -1 = anonymous, no file backing
# Write structured data using struct
struct_fmt = "=iqd" # native endian: uint32, int64, double
struct_size = struct.calcsize(struct_fmt)
shm[:struct_size] = struct.pack(struct_fmt, 0, 0, 0.0) # initialize
pid = os.fork()
if pid == 0:
# Child: write to shared memory
count, total, avg = struct.unpack(struct_fmt, shm[:struct_size])
for i in range(100):
count += 1
total += i
avg = total / count
shm[:struct_size] = struct.pack(struct_fmt, count, total, avg)
shm.flush() # ensure writes are visible
os._exit(0)
else:
os.waitpid(pid, 0)
count, total, avg = struct.unpack(struct_fmt, shm[:struct_size])
print(f"Child computed: count={count}, total={total}, avg={avg:.2f}")
shm.close()
multiprocessing.Value and multiprocessing.Array
For simple typed shared variables, multiprocessing.Value and multiprocessing.Array are more ergonomic than raw SharedMemory. They use ctypes to define the type and include a built-in lock.
import multiprocessing
import ctypes
import time
# Shared counter with lock
counter = multiprocessing.Value(ctypes.c_int64, 0)
def increment_counter(shared_counter, n: int):
for _ in range(n):
with shared_counter.get_lock():
shared_counter.value += 1
procs = [
multiprocessing.Process(target=increment_counter, args=(counter, 10000))
for _ in range(4)
]
for p in procs:
p.start()
for p in procs:
p.join()
print(f"Counter: {counter.value}") # should be 40000
# Shared array - a fixed-size array of ctypes values
shared_arr = multiprocessing.Array(ctypes.c_double, 100)
def fill_array(arr, start: int, end: int, value: float):
for i in range(start, end):
arr[i] = value
p1 = multiprocessing.Process(target=fill_array, args=(shared_arr, 0, 50, 1.0))
p2 = multiprocessing.Process(target=fill_array, args=(shared_arr, 50, 100, 2.0))
p1.start(); p2.start()
p1.join(); p2.join()
import numpy as np
arr = np.frombuffer(shared_arr.get_obj(), dtype=np.float64)
print(f"First half mean: {arr[:50].mean()}") # 1.0
print(f"Second half mean: {arr[50:].mean()}") # 2.0
Message Passing Patterns
Request-Reply
import multiprocessing
def rpc_worker(conn: multiprocessing.connection.Connection):
"""Handle RPC requests from the parent."""
while True:
try:
request = conn.recv()
except EOFError:
break
if request is None:
break
# Dispatch on request type
method = request.get("method")
args = request.get("args", [])
if method == "add":
response = {"result": sum(args), "error": None}
elif method == "multiply":
result = 1
for x in args:
result *= x
response = {"result": result, "error": None}
else:
response = {"result": None, "error": f"Unknown method: {method}"}
conn.send(response)
conn.close()
parent_conn, child_conn = multiprocessing.Pipe(duplex=True)
worker = multiprocessing.Process(target=rpc_worker, args=(child_conn,))
worker.start()
child_conn.close()
# RPC calls
for request in [
{"method": "add", "args": [1, 2, 3, 4, 5]},
{"method": "multiply", "args": [2, 3, 4]},
{"method": "sqrt", "args": [16]},
]:
parent_conn.send(request)
response = parent_conn.recv()
print(f"Request: {request['method']}({request['args']}) -> {response}")
parent_conn.send(None)
parent_conn.close()
worker.join()
Fan-Out Pattern (Work Queue)
import multiprocessing
import time
import os
def fan_out_worker(worker_id: int, task_q: multiprocessing.Queue,
result_q: multiprocessing.Queue):
while True:
task = task_q.get()
if task is None:
task_q.put(None) # pass poison pill to next worker
break
result = {"worker": worker_id, "task": task, "result": task * 2}
result_q.put(result)
N_WORKERS = 4
task_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()
workers = [
multiprocessing.Process(target=fan_out_worker, args=(i, task_q, result_q))
for i in range(N_WORKERS)
]
for w in workers:
w.start()
# Dispatch 100 tasks
for i in range(100):
task_q.put(i)
task_q.put(None) # first poison pill triggers cascade through all workers
# Collect results
results = []
for _ in range(100):
results.append(result_q.get())
for w in workers:
w.join()
print(f"Fan-out complete: {len(results)} results")
print(f"Workers used: {set(r['worker'] for r in results)}")
Pipeline Pattern
import multiprocessing
def stage1_fetch(out_q: multiprocessing.Queue, n: int):
"""Stage 1: generate data."""
for i in range(n):
out_q.put({"raw": i * 10})
out_q.put(None)
def stage2_transform(in_q: multiprocessing.Queue, out_q: multiprocessing.Queue):
"""Stage 2: transform data."""
while True:
item = in_q.get()
if item is None:
out_q.put(None)
break
out_q.put({"transformed": item["raw"] ** 2})
def stage3_store(in_q: multiprocessing.Queue, result: list):
"""Stage 3: store results."""
count = 0
while True:
item = in_q.get()
if item is None:
break
result.append(item["transformed"])
count += 1
print(f"Stage 3: stored {count} items")
q1 = multiprocessing.Queue(maxsize=50)
q2 = multiprocessing.Queue(maxsize=50)
results = multiprocessing.Manager().list()
p1 = multiprocessing.Process(target=stage1_fetch, args=(q1, 200))
p2 = multiprocessing.Process(target=stage2_transform, args=(q1, q2))
p3 = multiprocessing.Process(target=stage3_store, args=(q2, results))
p1.start(); p2.start(); p3.start()
p1.join(); p2.join(); p3.join()
print(f"Pipeline results: {len(results)} items, first 5: {list(results[:5])}")
Interview Q&A
Q1: Why is multiprocessing.shared_memory.SharedMemory so much faster than passing data through a multiprocessing.Queue?
multiprocessing.Queue internally uses a multiprocessing.Pipe, which is backed by an OS pipe or socket. When you call q.put(arr) where arr is a NumPy array, the queue serializes the array using pickle - this involves traversing the Python object graph, writing a byte representation, and sending those bytes through the pipe buffer (which has a limited kernel buffer size of ~64 KB on Linux). The receiving process then reads the bytes from the pipe and deserializes them back into a Python object. For a 500 MB array: pickle serialization to bytes (~2-3 seconds), write to pipe (kernel copies, ~1-2 seconds), read from pipe (kernel copies, ~1-2 seconds), deserialize (~2-3 seconds). Total: ~8 seconds, 4 data copies.
SharedMemory maps the same physical RAM pages into both processes' address spaces. Writing to shm.buf writes directly to those pages. Reading from shm.buf in the other process reads from those same pages. No serialization, no deserialization, no pipe buffer, no kernel copies. The only cost is the mmap(2) call to attach (microseconds) and page faults on first access (the kernel maps physical pages into virtual address space on demand). For a 500 MB array, the "transfer" is ~3 milliseconds - mostly page fault overhead. The speedup for large arrays is typically 1000-3000x.
Q2: What is the multiprocessing.Queue deadlock pattern and how do you avoid it?
The deadlock occurs when: (1) a producer process is filling the queue faster than consumers can drain it, (2) the queue's internal pipe buffer fills up, causing q.put() to block, and (3) the main process calls p.join() on the producer, which also blocks - because the producer is blocked trying to put more data. Neither can proceed: the main process is waiting for the producer to finish, and the producer is waiting for the pipe buffer to drain. The pipe buffer will never drain because the main process is blocked in join() and never calls q.get().
The fix is always to drain the queue before calling join() on processes that use it. A safe pattern: use a background drain thread in the main process, or structure the code so q.get() loops are called before any p.join(). Alternatively, set maxsize on the Queue to apply backpressure - this limits how far ahead the producer can run, but does not prevent the deadlock if the main process joins before draining. The only reliable fix is to drain before joining.
Q3: Explain the unlink() semantics of POSIX shared memory and what happens if you forget to call it.
POSIX shared memory objects live in /dev/shm (on Linux) as filesystem entries backed by tmpfs. SharedMemory.close() is analogous to close(fd) - it detaches the current process's mapping by calling munmap(2). The /dev/shm/<name> entry persists; other processes can still attach to it. SharedMemory.unlink() is analogous to os.unlink() on a regular file - it removes the /dev/shm/<name> entry. After unlink(), no new processes can attach (the name is gone), but existing mappings in other processes remain valid until they call close(). The kernel reference-counts the underlying object and frees the memory only when all mappings are closed and the name is unlinked.
If you forget unlink(), the shared memory region persists in /dev/shm until reboot. On a server with frequent restarts, orphaned /dev/shm entries accumulate. Each entry consumes RAM-backed tmpfs space. In a container environment, /dev/shm often has a small size limit (64 MB by default in Docker) - orphaned entries fill it quickly and cause ENOSPC on subsequent SharedMemory(create=True) calls. The fix is: in the owning process, use try/finally or a context manager to ensure unlink() is always called, even on exceptions.
Q4: How do POSIX semaphores work under the hood, and what is the difference between Semaphore and BoundedSemaphore?
A POSIX semaphore is a kernel-maintained counter with two operations: sem_wait() (decrement, blocks if counter is 0) and sem_post() (increment, wakes a blocked waiter). For named semaphores (the kind multiprocessing.Semaphore uses), the kernel maintains the counter in a shared kernel object referenced by a name in /dev/shm. The operations are atomic - the kernel guarantees no race conditions even with multiple processes calling sem_wait()/sem_post() simultaneously. Internally, the kernel uses a futex (fast user-space mutex) for the common uncontested case - if the counter is non-zero, sem_wait() succeeds with a single atomic decrement and no kernel entry. Only when the counter is zero (and the caller must block) does a real kernel syscall occur.
Semaphore(n) allows the count to go above n via repeated release() calls - there is no upper bound check. This can mask bugs where release() is called more times than acquire(), creating spurious "credits." BoundedSemaphore(n) raises ValueError if release() would push the count above the initial value n. Use BoundedSemaphore when the semantics require exactly one release() per acquire() - resource pools, rate limiters, and producer-consumer buffers where the semaphore count represents a meaningful quantity.
Q5: What are the synchronization challenges when multiple processes write to shared memory simultaneously, and how do you address them?
Shared memory itself provides no synchronization - it is raw memory. If two processes write to overlapping regions simultaneously, the result is a data race: reads and writes of values larger than the machine word size (8 bytes on 64-bit) are not atomic. A 64-byte struct being updated by one process can be read mid-update by another, yielding a torn read (some fields from the old state, some from the new state).
The standard approach is to pair shared memory with a synchronization primitive: a multiprocessing.Lock (backed by a POSIX mutex in shared memory), a multiprocessing.Semaphore, or an explicit atomic compare-and-swap via ctypes. For producer-consumer patterns, a common pattern is: the producer writes to a "staging" region of shared memory, then increments a semaphore; the consumer waits on the semaphore, reads the data, and signals back. For reader-writer workloads with many readers and few writers, a multiprocessing.RLock or a custom reader-writer lock built on semaphores provides better concurrency. In practice, many ML inference systems sidestep this problem by making the shared region read-only after initialization: weights are written once at startup, then all workers only read - no synchronization needed (reads of cache-coherent memory are always consistent on x86).
